package cn.tannn.tregistry.server.sub;

import cn.tannn.tregistry.core.service.TRegistryService;
import cn.tannn.tregistry.core.subscribe.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.codec.ServerSentEvent;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.time.Duration;

/**
 * @see https://juejin.cn/post/7202445722972635193
 * @author <a href="https://tannn.cn/">tnnn</a>
 * @version V1.0
 * @date 2024/4/24 下午7:12
 */
@Slf4j
public class MessageGenerate {

    /**
     * 保活
     * @param duration 发送数据间隔/s
     * @param data data
     * @param service service
     * @return
     * @param <T>
     */
    public  static  <T> Flux keepAlive(Duration duration, Flux<T> data, String service) {
        // 空流，它将被发送到打开的连接以保持它打开
        Flux<ServerSentEvent<T>> heartBeat = Flux.interval(duration)
                .map(
                        e -> ServerSentEvent.<T>builder() //Create a new SSE object with a comment and an empty body
                                .comment("keep alive")
                                .id(service)
                                .build())
                .doFinally(signalType -> log.info("Heartbeat closed for service: {}", service));
        return Flux.merge(heartBeat, data);
    }

    /**
     * 数据集
     * @param service service
     * @return ServerSentEvent
     */
    public static ServerSentEvent<MessageEvent> generateNotification(String service) {
        return ServerSentEvent.<MessageEvent>builder()
                .id(service)
                .data(new MessageEvent(service, TRegistryService.VERSIONS.get(service)))
                .build();
    }


    /**
     * 数据集
     * @param sink MessageEvent
     * @param service service
     */
    public static void generateNotifications(FluxSink<ServerSentEvent<MessageEvent>> sink,String service) {
        reactor.core.publisher.Flux.interval(Duration.ofSeconds(2)) // Generate simple notifications every 2 seconds.
                .map(i -> generateNotification(service))
                .doOnNext(serverSentEvent -> {
                    sink.next(serverSentEvent); // Sending notifications to the global Flux via its FluxSink
                    log.info("Sent for {}", serverSentEvent.data());
                })
                .doFinally(signalType -> log.info("Notification flux closed")) // Logging the closure of our generator
                .takeWhile(notification -> !sink.isCancelled()) // We generate messages until the global Flux is closed
                .subscribe();
    }
}
